package com.hanyi.web.controller.task;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.net.NetUtil;
import cn.hutool.extra.spring.SpringUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.hanyi.common.core.domain.entity.SysDept;
import com.hanyi.common.utils.DateUtils;
import com.hanyi.common.utils.redis.RedisUtils;
import com.hanyi.runner.HanYiApplicationRunner;
import com.hanyi.system.domain.*;
import com.hanyi.system.domain.bo.SysLocalTaskBo;
import com.hanyi.system.domain.bo.SysRealtimeTaskBo;
import com.hanyi.system.domain.vo.SysLocalTaskVo;
import com.hanyi.system.domain.vo.SysRealtimeTaskVo;
import com.hanyi.system.service.ISysAnalysisResultService;
import com.hanyi.system.service.ISysDeptService;
import com.hanyi.system.service.ISysLocalTaskService;
import com.hanyi.system.service.ISysRealtimeTaskService;
import com.hanyi.thirdparty.*;
import com.hanyi.thirdparty.domain.ChannelInfo;
import com.hanyi.thirdparty.domain.StreamConfig;
import com.hanyi.thirdparty.service.IAlgorithmService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import javax.annotation.PostConstruct;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**调度模块负责按计划时间启动,停止和查询算法任务，同时控制并发任务数，保证算法模块正常运行**/
@Component
public class ScheduleTask implements Runnable{
    private static final Logger log = LoggerFactory.getLogger(HanYiApplicationRunner.class);

    private final static String IS_TIME_PLAN = "Y";
    private final static String TASK_INIT="0";
    private final static String TASK_RUNNING="1";
    private final static String TASK_TERMINATED="2";
    private final static String TASK_ERROR="3";


    private long timeCount;
    private Thread task;
    private volatile boolean stop=false;
    private volatile int currentTaskCount = 0;

    private List<SysRealtimeTask> realtimeTaskVoList;
    private List<SysLocalTask> localTaskVoList;

    private String uploadUrl = "";//监测结果上报地址url

    private Map<String,ChannelInfo> channelInfoMap = new ConcurrentHashMap();
    private volatile CheckAllChannelInfoResponse channelInfo;

    @Autowired
    private ISysRealtimeTaskService iSysRealtimeTaskService;
    @Autowired
    private ISysLocalTaskService iSysLocalTaskService;

    @Autowired
    private IAlgorithmService iAlgorithmService;


    private ScheduleTask(){
        String profile = SpringUtil.getActiveProfile();
        /*if(profile.equals("prod")){
            String localIP =  NetUtil.getLocalhostStr();
            uploadUrl = localIP+":8080"+"/system/analysisResult/event/result";
        }else{
            uploadUrl = "http://192.168.2.100:8080/system/analysisResult/event/result";
        }*/

        String localIP =  NetUtil.getLocalhostStr();
        uploadUrl = localIP+":8080"+"/system/analysisResult/event/result";
    }

    /**
     * 查询当前任务缓存列表，过期的任务不做处理。遍历任务列表，判断当前时间和任务状态确定是否启动或停止任务
     * 如果task_start_time < now < task_end_time && status未启动，则启动任务，并修改状态为运行中
     * 如果now<task_start_time,则任务未启动，也是初始状态
     * 如果now>task_end_time,且任务为运行状态，则停止任务，状态改为已停止
     * **/
    private void scheduleTimeTask(){
        //查询任务列表
        realtimeTaskVoList = iSysRealtimeTaskService.queryList();
        localTaskVoList = iSysLocalTaskService.queryList();

        long now = System.currentTimeMillis();
        ////////////////////////开始调度逻辑/////////////////////////////
        for(SysRealtimeTask realtimeTask:realtimeTaskVoList){
            //更新算法通道状态
            ChannelInfo channelInfo = channelInfoMap.get(String.valueOf(realtimeTask.getTaskId()));
            if(channelInfo!=null){
                if(realtimeTask.getChannelStatus()!=channelInfo.getStatus()){
                    realtimeTask.setChannelStatus(channelInfo.getStatus());
                    iSysRealtimeTaskService.updateByEntity(realtimeTask);
                }
            }else{
                if(realtimeTask.getChannelStatus()!=0){
                    realtimeTask.setChannelStatus(0);
                    iSysRealtimeTaskService.updateByEntity(realtimeTask);
                }
            }

            //如果是时间计划任务，按开始结束时间调度任务
            if(realtimeTask.getIsTimePlan().equals(IS_TIME_PLAN)){
                long startTime = realtimeTask.getStartTime().getTime();
                long endTime = realtimeTask.getEndTime().getTime();
                log.info("realtime plan task:"+startTime+",endTime:"+endTime+",status:"+realtimeTask.getStatus());
                //启动时间到，发送启动指令，启动分析任务
                if(now>startTime && now<endTime){
                    if(realtimeTask.getStatus().equals(TASK_INIT)){
                        boolean ok = startRealTimeTask(realtimeTask.getTaskId(),realtimeTask.getMainStreamUrl());
                        if(ok){
                            SysRealtimeTask task = iSysRealtimeTaskService.queryByTaskId(realtimeTask.getTaskId());
                            task.setStatus(TASK_RUNNING);
                            iSysRealtimeTaskService.updateByEntity(task);
                        }
                    }
                }else{//任务结束时间到
                    if(realtimeTask.getStatus().equals(TASK_RUNNING)){
                        //停止时间到，发送停止指令，停止算法任务
                        stopTask(realtimeTask.getTaskId());

                        realtimeTask.setStatus(TASK_TERMINATED);
                        SysRealtimeTask task = iSysRealtimeTaskService.queryByTaskId(realtimeTask.getTaskId());
                        task.setStatus(TASK_TERMINATED);
                        iSysRealtimeTaskService.updateByEntity(task);
                    }
                }

            }else{
                //检查手工操作启动的任务是否都完成启动，如果数据库在启动状态，但是通道信息中没有，尝试启动
                if(realtimeTask.getStatus().equals(TASK_RUNNING)){
                    long taskId = realtimeTask.getTaskId();
                    if(channelInfo!=null){
                        if(channelInfo.getStatus() >= 4){//通道已关闭
                            stopTask(taskId);
                        }
                    }
                }
            }
        }

        //本地任务状态更新
        for(SysLocalTask localTask:localTaskVoList){
            //更新算法通道状态
            ChannelInfo channelInfo = channelInfoMap.get(String.valueOf(localTask.getTaskId()));
            if(channelInfo!=null){
                if(localTask.getChannelStatus()!=channelInfo.getStatus()){
                    localTask.setChannelStatus(channelInfo.getStatus());
                    iSysLocalTaskService.updateByEntity(localTask);
                }
            }else{
                if(localTask.getChannelStatus()!=0){
                    localTask.setChannelStatus(0);
                    iSysLocalTaskService.updateByEntity(localTask);
                }
            }

            //如果是时间计划任务，按开始和结束时间调度任务
            if(localTask.getIsTimePlan().equals(IS_TIME_PLAN)){
                long startTime = localTask.getStartTime().getTime();
                long endTime = localTask.getEndTime().getTime();
                if(now>startTime && now<endTime){
                    if(localTask.getStatus().equals(TASK_INIT)){
                        //启动时间到，发送启动指令，启动分析任务
                        boolean ok = startLocalTask(localTask.getTaskId(),localTask.getVideoPath());
                        if(ok){
                            SysLocalTask task = iSysLocalTaskService.queryByTaskId(localTask.getTaskId());
                            task.setStatus(TASK_RUNNING);
                            iSysLocalTaskService.updateByEntity(task);
                        }
                    }
                }else{
                    if(localTask.getStatus().equals(TASK_RUNNING)){
                        //停止时间到，发送停止指令，停止算法任务
                        stopTask(localTask.getTaskId());

                        localTask.setStatus(TASK_TERMINATED);
                        SysLocalTask task = iSysLocalTaskService.queryByTaskId(localTask.getTaskId());
                        task.setStatus(TASK_TERMINATED);
                        iSysLocalTaskService.updateByEntity(task);
                    }
                }

            }else{
                //检查手工操作启动的任务是否都完成启动，如果数据库在启动状态，但是通道信息中没有，尝试启动
                if(localTask.getStatus().equals(TASK_RUNNING)){
                    long taskId = localTask.getTaskId();
                    if(channelInfo!=null){
                        if(channelInfo.getStatus() >= 4){//通道已关闭
                            stopTask(taskId);
                        }
                    }
                }
            }
        }

    }


    @Override
    public void run() {
        log.info("调度任务开始------------------------------------");
        timeCount=0;
        boolean settingOk = false;
        while(!settingOk){
            settingOk = setGlobalSetting();
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
            }
        }

        while(!stop && !task.isInterrupted()){
            //获取当前算法任务通道状态，如果返回结果正常，则开始任务调度，否则这返回重试
            if(timeCount%3==0){
                //缓存通道状态
                channelInfoMap.clear();
                channelInfo =  iAlgorithmService.checkChannelInfo();
                if(channelInfo == null || channelInfo.getCode()!=0){
                    log.error("Failed to get channel infos,need to retry..."+timeCount);
                }else{
                    List<ChannelInfo> channelInfoList = channelInfo.getData().getChannel_infos();
                    for(ChannelInfo info:channelInfoList){
                        channelInfoMap.put(info.getTask_id(),info);
                    }
                    scheduleTimeTask();
                }
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                 stop =true;
            }
            timeCount++;
        }

        //线程停止，关闭所有算法通道
        List<ChannelInfo> channelInfoList = channelInfo.getData().getChannel_infos();
        for(ChannelInfo info:channelInfoList){
            CloseChannelRequest closeChannelRequest = new CloseChannelRequest();
            closeChannelRequest.setChannel_id(info.getChannel_id());
            closeChannelRequest.setTask_id(info.getTask_id());
            iAlgorithmService.closeChannel(closeChannelRequest);
        }

        log.info("调度任务退出----------------------------------------------");

    }


    public void start(){
        stop = false;
        task = new Thread(this);
        task.start();
    }

    public void stop(){
        stop=true;
        if(task!=null){
            task.interrupt();
            task = null;
        }
    }

    public CheckAllChannelInfoResponse getChannelInfo() {
        return channelInfo;
    }

    public int getCurrentTaskCount() {
        return currentTaskCount;
    }

    public void setCurrentTaskCount(int currentTaskCount) {
        this.currentTaskCount = currentTaskCount;
    }

    public boolean setGlobalSetting(){

        SetGlobalUploadConfigRequest req = new SetGlobalUploadConfigRequest();
        req.setStorage_dir("/data/images/");
        req.setUpload_address(uploadUrl);
        req.setUpload_type(0);
        req.setUpload_image_type(1);
        req.setUpload_frame_results(false);
        SetGlobalUploadConfigRequest.UploadType uploadType = new SetGlobalUploadConfigRequest.UploadType();
        uploadType.setResult(true);
        uploadType.setCrop_image(true);
        uploadType.setFull_image(true);
        req.setUpload_datas(uploadType);
        log.info("setup global settings:");
        BaseResponse resp = iAlgorithmService.setGlobalUploadConfig(req);
        if(resp!=null){
            return resp.getCode()==0;
        }

        return false;
    }
    public boolean startRealTimeTask(long taskId,String rtspUrl){
        //todo
        log.info("try to start real time task");
        //检查通道任务数是否已达最大值8路
        if(channelInfoMap.size() == 8){
            log.warn("To max channel count limit 8.try again later");
            return false;
        }

        if(channelInfoMap.containsKey(String.valueOf(taskId))){
            log.warn("channel already exist running.");
            return false;
        }

        //检查任务是否已经启动
        //根据绑定的盒子IP，调用边缘盒子api，启动分析任务
        AddChannelRequest request = new AddChannelRequest();
        request.setAlg_flag(8);
        request.setTask_id(String.valueOf(taskId));
        request.setKeep_frame(false);
        request.setUpload_address(uploadUrl);

        StreamConfig streamConfig = new StreamConfig();
        streamConfig.setType(0);
        streamConfig.setUrl(rtspUrl);
        streamConfig.setLoop_play(false);
        streamConfig.setSpeed_rate(1);
        streamConfig.setTransfer_type("tcp");
        streamConfig.setPump_frame_rate(5);

        request.setStream_config(streamConfig);

        AddChannelResponse resp = iAlgorithmService.addChannel(request);
        return resp.getCode()==0;
    }


    public boolean startLocalTask(long taskId,String videoPath){
        //todo
        log.info("request to startLocalTask");

        if(channelInfoMap.size() == 8){
            log.warn("To max channel count limit 8.try again later");
            return false;
        }

        if(channelInfoMap.containsKey(String.valueOf(taskId))){
            log.warn("channel already exist running.");
            return false;
        }
        AddChannelRequest request = new AddChannelRequest();
        request.setAlg_flag(8);
        request.setTask_id(String.valueOf(taskId));
        request.setKeep_frame(false);
        request.setUpload_address(uploadUrl);

        StreamConfig streamConfig = new StreamConfig();
        streamConfig.setType(1);
        streamConfig.setUrl(videoPath);
        streamConfig.setLoop_play(false);
        streamConfig.setSpeed_rate(1);
        streamConfig.setTransfer_type("tcp");
        streamConfig.setPump_frame_rate(5);

        request.setStream_config(streamConfig);

        AddChannelResponse resp = iAlgorithmService.addChannel(request);
        return resp.getCode()==0;

    }

    public void stopTask(long taskId){
        log.info("request to stop task "+taskId);
        //缓存查询指定任务
        //调用发送停止任务请求
        //从缓存中删除任务
        ChannelInfo channelInfo = channelInfoMap.get(String.valueOf(taskId));
        if(channelInfo!=null){
            CloseChannelRequest closeChannelRequest = new CloseChannelRequest();
            closeChannelRequest.setChannel_id(channelInfo.getChannel_id());
            closeChannelRequest.setTask_id(channelInfo.getTask_id());
            iAlgorithmService.closeChannel(closeChannelRequest);
        }
    }

}
